Skip to content

[python][ray] Support multi-clause fall-through in merge_into#8115

Open
XiaoHongbo-Hope wants to merge 12 commits into
apache:masterfrom
XiaoHongbo-Hope:support_multi_clause
Open

[python][ray] Support multi-clause fall-through in merge_into#8115
XiaoHongbo-Hope wants to merge 12 commits into
apache:masterfrom
XiaoHongbo-Hope:support_multi_clause

Conversation

@XiaoHongbo-Hope
Copy link
Copy Markdown
Contributor

Purpose

Tests

@XiaoHongbo-Hope XiaoHongbo-Hope changed the title Support multi clause [python][ray] Support multi-clause fall-through in merge_into Jun 4, 2026
Multiple WhenMatched/WhenNotMatched clauses evaluated in order;
first matching condition wins, unmatched rows fall through.

- Remove single-clause NotImplementedError and assert guards
- Matched path: filter per clause, track remaining by _ROW_ID
  using vectorized pc.is_in
- Not-matched path: filter per clause, use NOT(condition) for
  remaining rows
- Lazy import filter_batch only when condition is present
- Add tests: fall-through, no-match-skipped, first-wins
- Update docs with multi-clause example
- Use COALESCE(NOT(cond), TRUE) in not-matched path to preserve
  rows where condition evaluates to NULL
- test_multi_clause_first_wins now uses distinct partial updates
  per clause so the winning clause is verifiable
- Hoist filter_batch import outside _transform to avoid per-batch
  import overhead
Check for duplicate t._ROW_ID at the start of _transform before
clause fall-through. Prevents silent row loss when multiple source
rows match the same target — raises ValueError instead of letting
the _ROW_ID filter silently drop the second source row.
Check duplicate _ROW_ID after condition filtering, not before.
This preserves the single-clause behavior where condition filters
duplicate sources down to one (test_duplicate_source_filtered_by_condition).

Also add test_multi_not_matched_null_falls_through to verify
NULL condition rows correctly fall through to the next clause.
When multiple clauses are present, check for duplicate t._ROW_ID
on the entire batch before clause iteration. This prevents the
_ROW_ID filter from silently dropping the second source row across
clauses.

Single-clause path is unaffected (still allows condition to filter
duplicates down to one).

Add test_multi_clause_duplicate_source_raises: two source rows
matching the same target with multi-clause should raise.
The batch-level check was too strict: it rejected duplicate source
rows even when only one was actionable after condition filtering.
It also depended on Ray batch boundaries (inconsistent behavior).

Per-clause duplicate check remains: if a single clause's matched
rows contain duplicate _ROW_ID, that's a real error.

Replace test with: two source rows match same target, only one
satisfies any clause condition → succeeds, target updated once.
Replace _ROW_ID based filtering with COALESCE(NOT(condition), TRUE)
in the matched path. This ensures duplicate source rows that satisfy
different clauses both produce output and get caught by the downstream
distributed_update_apply duplicate check, regardless of Ray batch
boundaries.

Add test: two source rows both actionable by different clauses
should raise "multiple source rows" from the global check.
@XiaoHongbo-Hope XiaoHongbo-Hope marked this pull request as ready for review June 4, 2026 08:46
Copy link
Copy Markdown
Contributor

@QuakeWang QuakeWang left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

captured_schema,
))
if rewritten is not None and matched.num_rows < remaining.num_rows:
not_cond = f"COALESCE(NOT ({rewritten}), TRUE)"
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Non-blocking test gap: the not-matched path covers NULL condition fall-through, but the matched path relies on the same COALESCE(NOT (...), TRUE) behavior here. Consider adding a matched-path NULL fall-through test to lock this down.

@JingsongLi
Copy link
Copy Markdown
Contributor

Thanks for adding multi-clause fall-through support. The overall evaluation order looks aligned with MERGE INTO semantics: clauses are evaluated in order, the first matching clause wins, and NULL conditions fall through.

One SQL-compatibility gap still needs to be handled before merging: a non-last clause without a condition should be rejected. Spark SQL rejects MERGE INTO statements where any non-last WHEN MATCHED / WHEN NOT MATCHED clause omits its condition; only the last clause of each clause kind may be unconditional.

This PR currently accepts the equivalent Python API shape, and the implementation will silently make later clauses unreachable because the unconditional clause consumes all remaining rows. For example:

when_matched=[
    WhenMatched(update="*"),
    WhenMatched(update={"age": "s.age"}, condition="s.age > 10"),
]

The second clause can never run, while the corresponding SQL form would fail during parsing/analysis. Could we add validation in _prepare for both when_matched and when_not_matched, plus negative tests for unconditional non-last clauses? That would make the Python API behavior match the supported MERGE INTO syntax more closely.

Spark SQL requires that only the last WHEN MATCHED / WHEN NOT MATCHED
clause may omit its condition. Add the same validation in _prepare so
the Python API rejects unreachable clauses early.
@XiaoHongbo-Hope
Copy link
Copy Markdown
Contributor Author

Thanks for adding multi-clause fall-through support. The overall evaluation order looks aligned with MERGE INTO semantics: clauses are evaluated in order, the first matching clause wins, and NULL conditions fall through.

One SQL-compatibility gap still needs to be handled before merging: a non-last clause without a condition should be rejected. Spark SQL rejects MERGE INTO statements where any non-last WHEN MATCHED / WHEN NOT MATCHED clause omits its condition; only the last clause of each clause kind may be unconditional.

This PR currently accepts the equivalent Python API shape, and the implementation will silently make later clauses unreachable because the unconditional clause consumes all remaining rows. For example:

when_matched=[
    WhenMatched(update="*"),
    WhenMatched(update={"age": "s.age"}, condition="s.age > 10"),
]

The second clause can never run, while the corresponding SQL form would fail during parsing/analysis. Could we add validation in _prepare for both when_matched and when_not_matched, plus negative tests for unconditional non-last clauses? That would make the Python API behavior match the supported MERGE INTO syntax more closely.

Thanks, updated

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants